From scikit-learn to Spark ML



To launch a notebook with pyspark (in local mode), run the following command in your shell:

IPYTHON_OPTS="notebook" pyspark --master local[*]

Goals

This notebook aims at demonstrating the similarities and main differencies between two powerful Machine Learning libraries: scikit-learn and Sparl ML.

The main objective behind this is to show the simplicity of moving from scikit-learn to Spark ML when working on a bigger range of data to train and use Machine Learning workflows.

As we will see, Spark ML is mainly inspired from scikit-learn's structure, so the scikit-learn user will easily be able to use Spark ML API when working on Big Data workflows is needed.

Structure of the notebook

In order to explain and present the main concepts behind both libraries, we will go through a complete example to build an entire Machine Learning workflow, and present the code for both scikit-learn and Spark ML at every step.

Dataset

We will work on the dataset 20 NewsGroup, which gathers comments about news documents, grouped in several topics (politics, sports, science, etc.). This example is drawn from one of the scikit-learn's tutorial on text data: http://scikit-learn.org/stable/tutorial/text_analytics/working_with_text_data.html#loading-the-20-newsgroups-dataset.

Initial Configurations


In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Initial example


Let's start with a very simple example to compare the use of scikit-learn and Spark ML. There is the same notion of Estimator/Transformer, and the way to use them is also the same. Two main differences though:

  • In scikit-learn, even a Transformer has the structure of an Estimator, with a fit() method that does nothing.
  • The result of the transformation is generally a vector in scikit-learn, whereas it is another DataFrame in Spark ML.
scikit-learn

In [2]:
import pandas as pd
from sklearn.datasets import load_iris
data = pd.DataFrame(data=load_iris().data, columns=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'])

In [3]:
data.head()


Out[3]:
sepal_length sepal_width petal_length petal_width
0 5.1 3.5 1.4 0.2
1 4.9 3.0 1.4 0.2
2 4.7 3.2 1.3 0.2
3 4.6 3.1 1.5 0.2
4 5.0 3.6 1.4 0.2

In [5]:
from sklearn.preprocessing import Binarizer
binarizer = Binarizer(threshold=5)
binarizer.fit_transform(data.sepal_length.reshape(-1, 1))


Out[5]:
array([[ 1.],
       [ 0.],
       [ 0.],
       [ 0.],
       [ 0.],
       [ 1.],
       [ 0.],
       [ 0.],
       [ 0.],
       [ 0.],
       [ 1.],
       [ 0.],
       [ 0.],
       [ 0.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 0.],
       [ 1.],
       [ 0.],
       [ 0.],
       [ 0.],
       [ 1.],
       [ 1.],
       [ 0.],
       [ 0.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 0.],
       [ 0.],
       [ 1.],
       [ 0.],
       [ 0.],
       [ 1.],
       [ 0.],
       [ 0.],
       [ 0.],
       [ 0.],
       [ 1.],
       [ 0.],
       [ 1.],
       [ 0.],
       [ 1.],
       [ 0.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 0.],
       [ 1.],
       [ 1.],
       [ 0.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 0.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 0.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.],
       [ 1.]])
Spark ML

In [6]:
df = sqlContext.createDataFrame(data)

In [7]:
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=5.0, inputCol='sepal_length', outputCol='sepal_length_bin')
binarizer.transform(df).show(5)


+------------+-----------+------------+-----------+----------------+
|sepal_length|sepal_width|petal_length|petal_width|sepal_length_bin|
+------------+-----------+------------+-----------+----------------+
|         5.1|        3.5|         1.4|        0.2|             1.0|
|         4.9|        3.0|         1.4|        0.2|             0.0|
|         4.7|        3.2|         1.3|        0.2|             0.0|
|         4.6|        3.1|         1.5|        0.2|             0.0|
|         5.0|        3.6|         1.4|        0.2|             0.0|
+------------+-----------+------------+-----------+----------------+
only showing top 5 rows

Load Newsgroup Data


Let's now work on the 20 NewsGroup dataset and prepare the data in both libraries.

scikit-learn

There is a scikit-learn loader for this dataset. We will convert the data and the target to pandas DataFrames.


In [9]:
# Import data
from sklearn.datasets import fetch_20newsgroups
categories = ['rec.autos', 'rec.sport.baseball', 'comp.graphics', 'comp.sys.mac.hardware', 
              'sci.space', 'sci.crypt', 'talk.politics.guns', 'talk.religion.misc']
newsgroup = fetch_20newsgroups(subset='train', categories=categories, shuffle=True, random_state=42)
print newsgroup.data[0]

# Create pandas DataFrames for values and targets
import pandas as pd
pdf_newsgroup = pd.DataFrame(data=newsgroup.data, columns=['news']) # Texts
pdf_newsgroup_target = pd.DataFrame(data=newsgroup.target, columns=['target']) # Targets


From: alizard@tweekco.uucp (A.Lizard)
Subject: Re: OTO, the Ancient Order of Oriental Templars
Organization: Tweek-Com Systems BBS, Moraga, CA (510) 631-0615
Lines: 18

Thyagi@cup.portal.com (Thyagi Morgoth NagaSiva) writes:

> "This organization is known at the present time as the Ancient
> Order of Oriental Templars.  Ordo Templi Orientis.  Otherwise:
> The Hermetic Brotherhood of Light.
> 
Does this organization have an official e-mail address these
days? (an address for any of the SF Bay Area Lodges, e.g. Thelema
would do.)
                                      93...
                                       A.Lizard

-------------------------------------------------------------------
A.Lizard Internet Addresses:
alizard%tweekco%boo@PacBell.COM        (preferred)
PacBell.COM!boo!tweekco!alizard (bang path for above)
alizard@gentoo.com (backup)
PGP2.2 public key available on request

Spark ML

In Spark ML, one often gathers all the information (data and targets) into the same DataFrame. We will therefore create a unique Spark DataFrame from concatenation of the two previous pandas DataFrames.


In [10]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df_newsgroup = sqlContext.createDataFrame(pd.concat([pdf_newsgroup, pdf_newsgroup_target], axis=1))
df_newsgroup.printSchema()
df_newsgroup.show(3)


root
 |-- news: string (nullable = true)
 |-- target: long (nullable = true)

+--------------------+------+
|                news|target|
+--------------------+------+
|From: alizard@twe...|     7|
|From: djk@ccwf.cc...|     1|
|From: rgonzal@gan...|     1|
+--------------------+------+
only showing top 3 rows

Train-Test Split


Train-Test split is a common operation in Machine Learning. It means that we hold on some of the available data in a test set and do as if it were new data. The Machine Learning algorithm will be train on the remaining training set, and the test set will be used to compare the predictions made on it to the ground truth, in order to measure the generalization capacity of the algorithm (the ability to adapt to new data and not only to the data used for training)

scikit-learn

In scikit-learn, a Train-Test split is simply done with the function train_test_split from the cross_validation package.


In [11]:
from sklearn.cross_validation import train_test_split
X_train, X_test, y_train, y_test = train_test_split(newsgroup.data, newsgroup.target, train_size=0.8, random_state=42)
Spark

In Spark SQL, a more general function named randomSplit allows to split randomly any DataFrame given the proportions wanted. No need to separate the data from the target, both are kept in a same DataFrame.


In [12]:
(df_train, df_test) = df_newsgroup.randomSplit([0.8, 0.2])

Feature engineering


Feature Engineering represents all the actions done on the data to transform, extract and select features in order to collect the maximum amount of information on the data to optimize Machine Learning algorithms' performances.

Since the algorithms mostly take as entry numerical data, we need in our case to extract knowledge from the text data and convert it into numerical features. Here are the transformations we are going to perform:

  • Tokenizing: Transform a text into a list of words
  • Term Frequency: The more a term is frequent, the more it has chances to carry useful information obout the text (unless it is a stop-word).
  • Inverse Document Frequency: If a term appears in most a the documents, there's little chance that it would be helpfull to distinguish and classify them.

In both scikit-learn and Spark ML, there are objects to perform these transformations.

  • CountVectorizer and TfidfTransformer in scikit-learn
  • Tokenizer, HashingTF and IDF in Spark ML

In both cases, the way to use these objects are much alike: they all have fit() and transform() methods.

NB: The objects used are not exactly the same, and do not have the same default parameters, so the results will be different. The purpose here is to show how to use Spark ML and to see how it looks like scikit-learn.

scikit-learn

In [13]:
# Tokenizing and Occurrence Counts
from sklearn.feature_extraction.text import CountVectorizer
count_vect = CountVectorizer()
X_train_counts = count_vect.fit_transform(X_train)

# TF-IDF
from sklearn.feature_extraction.text import TfidfTransformer
tfidf_transformer = TfidfTransformer()
X_train_tfidf = tfidf_transformer.fit_transform(X_train_counts)
Spark ML

In [14]:
# Tokenizing
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol='news', outputCol='news_words')
df_train_words = tokenizer.transform(df_train)

# Hashing Term-Frequency
from pyspark.ml.feature import HashingTF
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='news_tf', numFeatures=10000)
df_train_tf = hashing_tf.transform(df_train_words)

# Inverse Document Frequency
from pyspark.ml.feature import IDF
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="news_tfidf")
idf_model = idf.fit(df_train_tf) # fit to build the model on all the data, and then apply it line by line
df_train_tfidf = idf_model.transform(df_train_tf)

In [15]:
df_train_tfidf.show(5)


+--------------------+------+--------------------+--------------------+--------------------+
|                news|target|          news_words|             news_tf|          news_tfidf|
+--------------------+------+--------------------+--------------------+--------------------+
|Distribution: wor...|     1|[distribution:, w...|(10000,[0,55,97,1...|(10000,[0,55,97,1...|
|Distribution: wor...|     1|[distribution:, w...|(10000,[0,55,251,...|(10000,[0,55,251,...|
|Distribution: wor...|     1|[distribution:, w...|(10000,[0,56,152,...|(10000,[0,56,152,...|
|Distribution: wor...|     1|[distribution:, w...|(10000,[0,224,251...|(10000,[0,224,251...|
|Distribution: wor...|     1|[distribution:, w...|(10000,[0,97,217,...|(10000,[0,97,217,...|
+--------------------+------+--------------------+--------------------+--------------------+
only showing top 5 rows

Modelling & Prediction


Now that the data is ready to be used, we can start the modelling step. For this example, we will use a simple algorithm: a Decision Tree. Both scikit-learn and Spark ML have a DecisionTreeClassifier object for this.

The parameters to specify to this classifier are the same in both libraries, but with slightly different names. The way to use them is exactly the same.

One slightly difference though. In Spark ML, we need to specify that the target column is categorical, even if we use a Classifier. This is because the classifier in Spark ML needs to know the number of classes. One way to do this is to use a StringIndexer that will convert the column into a double column with the number of classes in its metadata.

If you don't do this, you will get an error like: "DecisionTreeClassifier was given input with invalid label column target, without the number of classes specified. See StringIndexer."

One last important note: Always perform the learning task on the training set, and the predictions on the test set. The test set needs to be transformed as the training set before it can be used by the model to make predictions.

scikit-learn

In [16]:
# Training a Decision Tree on training set
from sklearn.tree import DecisionTreeClassifier
clf = DecisionTreeClassifier(max_depth=10).fit(X_train_tfidf, y_train)

# Transform test set
X_test_counts = count_vect.transform(X_test)
X_test_tfidf = tfidf_transformer.transform(X_test_counts)

# Predictions on the test set
y_test_pred = clf.predict(X_test_tfidf)
Spark ML

In [17]:
# Indexing the target
from pyspark.ml.feature import StringIndexer
string_indexer = StringIndexer(inputCol='target', outputCol='target_indexed')
string_indexer_model = string_indexer.fit(df_train_tfidf)
df_train_final = string_indexer_model.transform(df_train_tfidf)

In [18]:
# Training a Decision Tree on training set
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol())
dt_model = dt.fit(df_train_final)

# Transform the test set
df_test_words = tokenizer.transform(df_test)
df_test_tf = hashing_tf.transform(df_test_words)
df_test_tfidf = idf_model.transform(df_test_tf)
df_test_final = string_indexer_model.transform(df_test_tfidf)

# Preditions on the test set
df_test_pred = dt_model.transform(df_test_final)

In [19]:
df_test_pred.select('news', 'target', 'prediction', 'probability').show(5)


+--------------------+------+----------+--------------------+
|                news|target|prediction|         probability|
+--------------------+------+----------+--------------------+
|From: "Daniel U. ...|     2|       3.0|[0.01863354037267...|
|From: CSP1DWD@MVS...|     1|       2.0|[0.09967320261437...|
|From: DAK988S@vma...|     3|       2.0|[0.09967320261437...|
|From: DPierce@wor...|     5|       0.0|[0.56428571428571...|
|From: Daniel Salb...|     1|       2.0|[0.09967320261437...|
+--------------------+------+----------+--------------------+
only showing top 5 rows

Pipeline


As we can see, the number of steps to perform can be quite important, especially for the Feature Engineering part. Chaining all the required steps on the training set to train a model, and then perform them all again on the test set to make predictions can be quite long.

The Pipeline object is here to make our lives easier on this point. It will gather into the same estimator all the steps to perform to transform the data, which will be used on the raw data of the training and test sets.

The steps to perform are the following:

  • Create an instance of each Transformer / Estimator to use
  • Group them into a Pipeline object
  • Call the method fit() of the pipeline to load the transformation and learning on the training set
  • Call the method transform() to perform the predictions on the test set

When the fit() method is called, the Pipeline object will call, in the order specified, the fit() method of the estimator if it has one, and then its transform() method.

scikit-learn

In [20]:
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.tree import DecisionTreeClassifier
from sklearn.pipeline import Pipeline

# Instanciate a Pipeline
text_clf = Pipeline([('vect', CountVectorizer()),
                     ('tfidf', TfidfTransformer()),
                     ('clf', DecisionTreeClassifier(max_depth=10)),
                    ])

# Transform the data and train the classifier on the training set
text_clf = text_clf.fit(X_train, y_train)

# Transform the data and perform predictions on the test set
y_test_pred = text_clf.predict(X_test)
Spark ML

In [23]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

# Instanciate all the Estimators and Transformers necessary
tokenizer = Tokenizer(inputCol='news', outputCol='news_words')
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='news_tf', numFeatures=10000)
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="news_tfidf")
string_indexer = StringIndexer(inputCol='target', outputCol='target_indexed')
dt = DecisionTreeClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol(), maxDepth=10)

# Instanciate a Pipeline
pipeline = Pipeline(stages=[tokenizer, 
                            hashing_tf, 
                            idf, 
                            string_indexer, 
                            dt])

# Transform the data and train the classifier on the training set
pipeline_model = pipeline.fit(df_train)

# Transform the data and perform predictions on the test set
df_test_pred = pipeline_model.transform(df_test)

In [24]:
df_test_pred.show(5)


+--------------------+------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+----------+
|                news|target|          news_words|             news_tf|          news_tfidf|target_indexed|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+----------+
|From: "Daniel U. ...|     2|[from:, "daniel, ...|(10000,[0,13,35,3...|(10000,[0,13,35,3...|           3.0|[2.0,3.0,4.0,94.0...|[0.01724137931034...|       3.0|
|From: CSP1DWD@MVS...|     1|[from:, csp1dwd@m...|(10000,[0,54,71,9...|(10000,[0,54,71,9...|           5.0|[213.0,123.0,192....|[0.13715389568576...|       4.0|
|From: DAK988S@vma...|     3|[from:, dak988s@v...|(10000,[0,50,54,6...|(10000,[0,50,54,6...|           2.0|[15.0,17.0,143.0,...|[0.04347826086956...|       2.0|
|From: DPierce@wor...|     5|[from:, dpierce@w...|(10000,[0,62,97,1...|(10000,[0,62,97,1...|           0.0|[79.0,3.0,0.0,6.0...|[0.73148148148148...|       0.0|
|From: Daniel Salb...|     1|[from:, daniel, s...|(10000,[0,49,54,9...|(10000,[0,49,54,9...|           5.0|[213.0,123.0,192....|[0.13715389568576...|       4.0|
+--------------------+------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+----------+
only showing top 5 rows

Model Evaluation


Once we have built our pipeline, it is time to evaluate it. This is where the test set is crucial. We perform perdictions on the test set, as if we didn't know the actual classes, and then compare the predictions with the ground truth. If we do this on the training set, we would be biased because we would perform predictions on the data used to build the model. Keeping a test set whose data is not used to build the model helps in observing the generalisation capacity of the model.

Both scikit-learn and Spark ML have built-in metrics to score all kinds of predictions. In our case, we will measure the precision of the prediction: the percentage of well classified data. This metric is present in the precision_score method in scikit-learn, and in the MulticlassClassificationEvaluator object in Spark ML.

scikit-learn

In [25]:
from sklearn.metrics import precision_score

# Evaluate the predictions done on the test set
precision_score(y_test_pred, y_test, average='micro')


Out[25]:
0.5386338185890257
Spark ML

In [26]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Instanciate a MulticlassClassificationEvaluator with precision metric
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='target_indexed', 
                                              metricName='precision')

# Evaluate the predictions done on the test set
evaluator.evaluate(df_test_pred)


Out[26]:
0.4809843400447427

Scores are different mainly because default parameters are not the same in scikit-learn and Spark ML

Parameter tuning

We now would like to improve the score of our model. One way to do that is to tune the parameters in order to find the best combinaison of parameters.

Tuning is generally done using the following tools:

  • Grid Search: Specify in a grid all values of each parameters we want to try
  • Cross Validation: Test several times all combinations of parameters, on different splits of the training set

In scikit-learn, one can use the GridSearchCV object. In Spark ML, it is a CrossValidator object. In each cases, there are three things that we need to specify:

  • The parameters grid (using a ParamGridBuilder object in Spark ML)
  • The estimator (or pipeline)
  • The scoring function to decide which combination gives the best score
scikit-learn

In [27]:
from sklearn.grid_search import GridSearchCV

# Create the parameters grid
parameters = {'tfidf__use_idf': (True, False),
              'clf__max_depth': (10, 20)
             }

# Instanciate a GridSearchCV object with the pipeline, the parameters grid and the scoring function
gs_clf = GridSearchCV(text_clf, parameters, score_func=precision_score, n_jobs=-1)

# Transform the data and train the classifier on the training set
gs_clf = gs_clf.fit(X_train, y_train)

# Transform the data and perform predictions on the test set
y_test_pred = gs_clf.predict(X_test)

# Evaluate the predictions done on the test set
precision_score(y_test_pred, y_test, average='micro')


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-27-66ed6696b057> in <module>()
      7 
      8 # Instanciate a GridSearchCV object with the pipeline, the parameters grid and the scoring function
----> 9 gs_clf = GridSearchCV(text_clf, parameters, score_func=precision_score, n_jobs=-1)
     10 
     11 # Transform the data and train the classifier on the training set

TypeError: __init__() got an unexpected keyword argument 'score_func'
Spark ML

In [28]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

# Instanciation of a ParamGridBuilder

grid = (ParamGridBuilder()
        .baseOn([evaluator.metricName, 'precision'])
        .addGrid(dt.maxDepth, [10, 20])
        .build())

# Instanciation of a CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)

# Transform the data and train the classifier on the training set
cv_model = cv.fit(df_train)

# Transform the data and perform predictions on the test set
df_test_pred = cv_model.transform(df_test)

# Evaluate the predictions done on the test set
evaluator.evaluate(df_test_pred)


Out[28]:
0.5738255033557047

Again, the results are different since not all parameters are used, and the default ones may not be the same. Moreover, we did not use exactly the same objects in the Feature Engineering phase (CountVectorizer / Tokenizer for example).

Conclusion


As we saw, scikit-learn and Spark ML have a lot in common. There are some slightly differences between both libraries, in terms of implementation and how the data is handled, but they are minimal. Spark ML was designed to be close to scikit-learn in the way to use it, and this helps a lot when going at scale with Spark to build complex Machine Learning pipelines.

Spark ML is still under active development, and has a limited amount of algorithms implemented for now comparing to scikit-learn. The list of possibilities offered by Spark ML will expand with time, and it will be more and more easy to go from scikit-learn to Spark ML.